👋 환영합니다!
📢 페이스북에 공유하기
### 기존 코드 활용 방식 PS \03ST\0316PythonAI\pxAgent> python run.py == px agent (YouTube sermon summarizer) == Type 'exit' to quit. You: exit \03ST\0316PythonAI\pxAgent> { #C}
PXAGENT/
├─ agent/
│ ├─ init.py
│ ├─ agent.py 👈 여기!
│ ├─ tools/
│ └─ ...
├─ run.py (또는 pxagent002.py / run_pxagent002.py)
run.py
from agent.agent import PxAgent # 절대 경로 임포트 사용
def main():
print("\n== px agent (YouTube sermon summarizer) ==\nType 'exit' to quit.\n")
agent = PxAgent()
while True:
user = input("You: ").strip()
if not user or user.lower() in {"exit", "quit"}:
break
# agent.run은 이미 tenacity로 감싸져 있어 에러가 나면 재시도함.
try:
result = agent.run(user)
print("\nAssistant:\n" + result.output_text + "\n")
except Exception as e:
print(f"\n[ERROR] 에이전트 실행 중 예외 발생: {e}\n")
if __name__ == "__main__":
main()
agent/agent.py
# agent/agent.py
from __future__ import annotations
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
import re
import os
import smtplib
from email.message import EmailMessage
from mimetypes import guess_type
from datetime import datetime
import openai
from .config import settings
from .memory import MemoryStore
from .safety import redact_pii
from .tool_router import run_tool
# =========================
# 유틸: 바탕화면(Desktop) 경로 탐지
# - 우선순위:
# 1) Windows API(SHGetFolderPathW) - 실제 시스템 데스크톱
# 2) 레지스트리(Shell Folders)
# 3) 환경변수 기반 후보들(Desktop, 바탕화면, OneDrive/Desktop, OneDrive/바탕화면)
# 4) 마지막 폴백: 홈 디렉토리
# =========================
def _desktop_dir() -> str:
# 1) Windows API
try:
import ctypes
from ctypes import wintypes
CSIDL_DESKTOPDIRECTORY = 0x10 # 실제 데스크톱 폴더
SHGFP_TYPE_CURRENT = 0
buf = ctypes.create_unicode_buffer(wintypes.MAX_PATH)
if ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_DESKTOPDIRECTORY, None, SHGFP_TYPE_CURRENT, buf) == 0:
p = buf.value
if p and os.path.isdir(p):
return p
except Exception:
pass
# 2) 레지스트리 (Windows만)
try:
import winreg # type: ignore
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders") as key:
val, _ = winreg.QueryValueEx(key, "Desktop")
if val and os.path.isdir(val):
return val
except Exception:
pass
# 3) 일반 후보들
home = os.path.expanduser("~")
candidates = [
os.path.join(home, "Desktop"),
os.path.join(home, "바탕화면"),
os.path.join(home, "OneDrive", "Desktop"),
os.path.join(home, "OneDrive", "바탕화면"),
# Microsoft 365/회사 테넌트 OneDrive 표기 변형도 시도
os.path.join(home, "OneDrive - Microsoft", "Desktop"),
os.path.join(home, "OneDrive - Microsoft", "바탕화면"),
]
for p in candidates:
if os.path.isdir(p):
return p
# 4) 폴백
return home
# =========================
# RTF 유니코드 유틸 (python-docx 없어도 한글 완벽)
# =========================
def _signed16(n: int) -> int:
n = n & 0xFFFF
return n - 0x10000 if n >= 0x8000 else n
def _rtf_escape_char(c: str) -> str:
if c == "\\":
return r"\\"
if c == "{":
return r"\{"
if c == "}":
return r"\}"
code = ord(c)
if 0x20 <= code <= 0x7E:
return c
if code > 0xFFFF:
u = code - 0x10000
high = 0xD800 + (u >> 10)
low = 0xDC00 + (u & 0x3FF)
return rf"\u{_signed16(high)}?\u{_signed16(low)}?"
return rf"\u{_signed16(code)}?"
def _text_to_rtf_body(text: str) -> str:
parts = []
for para in text.split("\n\n"):
# [수정] 각 문단을 다시 줄바꿈(\n) 기준으로 분리
lines_in_para = []
for ln in para.splitlines():
converted = "".join(_rtf_escape_char(c) for c in ln)
lines_in_para.append(converted)
# [수정] \line(줄바꿈)으로 줄을 연결하고 \par(문단 끝) 추가
parts.append(r"\line ".join(lines_in_para) + r"\par ")
return "\n".join(parts)
def _rtf_wrap(body: str) -> str:
return (
r"{\rtf1\ansi\deff0"
r"{\fonttbl{\f0 Arial;}}"
r"\viewkind4\uc1\pard\f0\fs24 "
+ body +
r"}"
)
def _safe_filename(stem: str) -> str:
stem = stem.strip()
stem = re.sub(r"[^\w\-]+", "_", stem)[:60] or "pxdoc"
return stem
# =========================
# 시간 파서
# =========================
_TIME = r"(\d{1,2}):(\d{2}):(\d{2})"
def _norm(h: str, m: str, s: str) -> str:
return f"{int(h):02d}:{int(m):02d}:{int(s):02d}"
def _parse_times_freeform(s: str) -> Tuple[Optional[str], Optional[str]]:
if not s:
return (None, None)
s = s.strip()
m = re.search(rf"{_TIME}\s*[-~]\s*{_TIME}", s)
if m:
a = _norm(*m.groups()[0:3])
b = _norm(*m.groups()[3:6])
return (a, b)
m = re.search(rf"(?:^|[\s])-(?P<h>\d{{1,2}}):(?P<m>\d{{2}}):(?P<s>\d{{2}})", s)
if m:
b = _norm(m.group("h"), m.group("m"), m.group("s"))
return (None, b)
m = re.search(rf"(?:끝|end)\s+{_TIME}", s, flags=re.I)
if m:
b = _norm(*m.groups()[0:3])
return (None, b)
m = re.search(rf"(?:시작|start)\s+{_TIME}", s, flags=re.I)
if m:
a = _norm(*m.groups()[0:3])
return (a, None)
m = re.search(rf"{_TIME}", s)
if m:
a = _norm(*m.groups()[0:3])
return (a, None)
return (None, None)
def _validate_time_order(s: Optional[str], e: Optional[str]) -> Tuple[Optional[str], Optional[str], Optional[str]]:
def to_sec(t: Optional[str]) -> Optional[int]:
if not t:
return None
try:
hh, mm, ss = map(int, t.split(":"))
except Exception:
return None
if not (0 <= mm < 60 and 0 <= ss < 60 and hh >= 0):
return None
return hh * 3600 + mm * 60 + ss
ss, ee = to_sec(s), to_sec(e)
if s and ss is None:
return None, None, "시간 형식이 잘못되었습니다(분/초는 00–59여야 합니다)."
if e and ee is None:
return None, None, "시간 형식이 잘못되었습니다(분/초는 00–59여야 합니다)."
if ss is not None and ee is not None and ss >= ee:
return None, None, "시작 시간이 끝 시간보다 크거나 같습니다."
return s, e, None
# =========================
# 생성 옵션
# =========================
@dataclass
class GenOptions:
sections: List[str] = field(default_factory=lambda: [
"시작 기도문", "아이스브레이크", "본문 요약", "본문 이해 질문", "적용 질문", "끝나는 기도문"
])
start_prayer_lines: Tuple[int, int] = (8, 10)
end_prayer_lines: Tuple[int, int] = (7, 10)
icebreakers_count: int = 3
understanding_q: int = 7
application_q: int = 7
summary_chars: Tuple[int, int] = (5000, 7000)
length_hint: str = "normal" # short|normal|long
@dataclass
class AgentResult:
output_text: str
tool_runs: List[Dict[str, Any]]
class PxAgent:
"""
- 유튜브 링크(+시간) 전사 → 지정 섹션만 생성(‘기도문만’ 등 옵션 기억)
- 결과는 즉시 바탕화면(또는 로컬 Desktop)에 RTF/DOC(=RTF) 저장, python-docx 있으면 DOCX도 저장
- 저장 직후 settings의 SMTP로 이메일 첨부 전송(설정 없으면 생략)
"""
def __init__(self, memory: Optional[MemoryStore] = None):
self.memory = memory or MemoryStore()
self.client = openai.OpenAI(api_key=settings.openai_api_key)
self._yt_url_re = re.compile(r"(https?://(?:www\.)?youtube\.com/\S+|https?://youtu\.be/\S+)", re.I)
self.pending_url: Optional[str] = None
self._pending_opts: Optional[GenOptions] = None
self._model = settings.model
self._max_out = int(getattr(settings, "max_output_tokens", 3072))
self._mem_sum_tokens = int(getattr(settings, "memory_summary_tokens", 800))
self._chunk_sec = int(getattr(settings, "youtube_chunk_duration_sec", 600))
self._overlap_sec = int(getattr(settings, "youtube_overlap_sec", 1))
# python-docx 사용 가능 여부
try:
from docx import Document # noqa: F401
self._has_docx = True
except Exception:
self._has_docx = False
# ----------------- 내부 유틸 -----------------
def _set_pending_url(self, url: str) -> None:
self.pending_url = url
def _get_pending_url(self) -> Optional[str]:
return self.pending_url
def _parse_generation_options(self, text: str) -> GenOptions:
t = text.lower()
opt = GenOptions()
want_prayer_only = ("기도문만" in text) or ("끝나는 기도문만" in text) or ("start prayer only" in t)
if want_prayer_only:
if "시작" in text or "start" in t:
opt.sections = ["시작 기도문"]
elif "끝" in text or "마무리" in text or "end" in t:
opt.sections = ["끝나는 기도문"]
else:
opt.sections = ["끝나는 기도문"]
if ("아이스브레이크만" in text) or ("icebreakers only" in t):
opt.sections = ["아이스브레이크"]
if ("요약만" in text) or ("본문 요약만" in text) or ("summary only" in t):
opt.sections = ["본문 요약"]
if ("이해 질문만" in text):
opt.sections = ["본문 이해 질문"]
if ("적용 질문만" in text):
opt.sections = ["적용 질문"]
m = re.search(r"아이스브레이크\s*(\d+)", text)
if m: opt.icebreakers_count = max(1, int(m.group(1)))
m = re.search(r"(이해\s*질문|본문\s*이해\s*질문)\s*(\d+)", text)
if m: opt.understanding_q = max(1, int(m.group(2)))
m = re.search(r"(적용\s*질문)\s*(\d+)", text)
if m: opt.application_q = max(1, int(m.group(2)))
m = re.search(r"(기도문|끝나는\s*기도문|마무리\s*기도)\s*(\d+)\s*~\s*(\d+)\s*줄", text)
if m:
a, b = int(m.group(2)), int(m.group(3))
if "끝" in m.group(1) or "마무리" in m.group(1):
opt.end_prayer_lines = (min(a, b), max(a, b))
else:
opt.start_prayer_lines = (min(a, b), max(a, b))
if ("짧게" in text) or ("간단히" in text) or ("short" in t):
opt.length_hint = "short"; opt.summary_chars = (1500, 2200)
if ("길게" in text) or ("자세히" in text) or ("long" in t):
opt.length_hint = "long"; opt.summary_chars = (7000, 9000)
return opt
def _has_explicit_prefs(self, text: str) -> bool:
t = text.lower()
markers = [
"기도문만", "끝나는 기도문만", "아이스브레이크만", "요약만",
"본문 요약만", "summary only", "icebreakers only",
"짧게", "간단히", "short", "길게", "자세히", "long",
"아이스브레이크", "이해 질문", "적용 질문", "기도문", "마무리 기도",
]
if any(m in text for m in markers) or any(m in t for m in markers):
return True
if re.search(r"아이스브레이크\s*\d+", text): return True
if re.search(r"(이해\s*질문|본문\s*이해\s*질문)\s*\d+", text): return True
if re.search(r"(적용\s*질문)\s*\d+", text): return True
if re.search(r"(기도문|끝나는\s*기도문|마무리\s*기도)\s*\d+\s*~\s*\d+\s*줄", text): return True
return False
def _remember_prefs_if_any(self, text: str) -> None:
if self._has_explicit_prefs(text):
self._pending_opts = self._parse_generation_options(text)
def _resolve_generation_options(self, text: str) -> GenOptions:
current = self._parse_generation_options(text)
if not self._has_explicit_prefs(text) and self._pending_opts:
use = self._pending_opts
self._pending_opts = None
return use
if self._has_explicit_prefs(text):
self._pending_opts = None
return current
return current
def _filter_to_allowed_sections(self, text: str, allowed: List[str]) -> str:
all_headers = ["시작 기도문", "아이스브레이크", "본문 요약", "본문 이해 질문", "적용 질문", "끝나는 기도문"]
forbidden = [h for h in all_headers if h not in allowed]
cleaned = text
for h in forbidden:
pattern = rf"\n?#\s*(?:\d+\)\s*)?{re.escape(h)}[^\n]*\n.*?(?=\n#\s|\Z)"
cleaned = re.sub(pattern, "", cleaned, flags=re.S | re.I)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned).strip()
return cleaned
# ----------------- 파일 내보내기 + 이메일 -----------------
# [새로 추가되는 메서드] 전사문 원본 저장용
def _save_raw_transcript(self, text: str, title_hint: str = "raw_transcript") -> Optional[str]:
"""
전사문 원본을 바탕화면에 .txt 파일로 저장합니다.
실패 시 None을 반환하지만, 메인 로직을 중단하지는 않습니다.
"""
try:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
base = f"{_safe_filename(title_hint)}_{ts}"
desktop = _desktop_dir()
filepath = os.path.join(desktop, f"{base}.txt")
# 한글 저장을 위해 encoding="utf-8" 필수
with open(filepath, "w", encoding="utf-8") as f:
f.write(text)
return filepath
except Exception:
# 저장에 실패해도 전체 프로세스는 계속 진행되도록 예외를 삼킵니다.
# 필요하다면 여기에 로깅을 추가할 수 있습니다.
return None
# ----------------- 파일 내보내기 + 이메일 -----------------
def _export_files(self, text: str, title_hint: str = "px_sermon") -> Dict[str, str]:
"""
결과 텍스트를 바탕화면에 저장:
- 항상 RTF, DOC(=RTF) 저장
- python-docx가 있으면 DOCX도 저장
반환: {"rtf": path, "doc": path, "docx": path?}
"""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
base = f"{_safe_filename(title_hint)}_{ts}"
desktop = _desktop_dir()
# RTF 내용
body = _text_to_rtf_body(text)
rtf = _rtf_wrap(body)
out: Dict[str, str] = {}
rtf_path = os.path.join(desktop, f"{base}.rtf")
with open(rtf_path, "w", encoding="ascii", errors="strict") as f:
f.write(rtf)
out["rtf"] = rtf_path
doc_path = os.path.join(desktop, f"{base}.doc")
with open(doc_path, "w", encoding="ascii", errors="strict") as f:
f.write(rtf)
out["doc"] = doc_path
if self._has_docx:
try:
from docx import Document
docx_path = os.path.join(desktop, f"{base}.docx")
doc = Document()
for para in text.split("\n\n"):
doc.add_paragraph(para)
doc.save(docx_path)
out["docx"] = docx_path
except Exception:
pass
return out
def _send_email_with_attachments(
self,
subject: str,
body: str,
to_email: str,
attachments: List[str],
smtp_host: str,
smtp_port: int,
smtp_user: str,
smtp_pass: str,
from_email: Optional[str] = None,
use_tls: bool = True,
) -> Tuple[bool, Optional[str]]:
msg = EmailMessage()
msg["Subject"] = subject
msg["To"] = to_email
msg["From"] = from_email or smtp_user
msg.set_content(body)
for path in attachments:
try:
ctype, _ = guess_type(path)
maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream"))
with open(path, "rb") as fp:
data = fp.read()
filename = os.path.basename(path)
msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename)
except Exception as e:
return False, f"첨부 실패: {path} ({e})"
try:
if use_tls:
with smtplib.SMTP(smtp_host, smtp_port) as s:
s.starttls()
s.login(smtp_user, smtp_pass)
s.send_message(msg)
else:
with smtplib.SMTP_SSL(smtp_host, smtp_port) as s:
s.login(smtp_user, smtp_pass)
s.send_message(msg)
return True, None
except Exception as e:
return False, f"이메일 전송 실패: {e}"
# ----------------- YouTube 워크플로우 -----------------
def _maybe_youtube_workflow(self, text: str) -> Optional[AgentResult]:
# 사용자 의도 기억
self._remember_prefs_if_any(text)
# URL 대기 상태에서 시간만 온 경우
pending = self._get_pending_url()
if pending:
s, e = _parse_times_freeform(text)
if s or e:
s, e, err = _validate_time_order(s, e)
if err:
out = f"시간 입력 오류: {err}"
self._remember(text, out)
return AgentResult(output_text=out, tool_runs=[])
yt = run_tool("youtube_stt", {
"url": pending, "lang_hint": "ko",
"sermon_start": s, "sermon_end": e,
"chunk_duration_sec": self._chunk_sec,
"overlap_sec": self._overlap_sec,
})
tool_runs = [{
"name": "youtube_stt",
"args": {
"url": pending, "sermon_start": s, "sermon_end": e,
"chunk_duration_sec": self._chunk_sec, "overlap_sec": self._overlap_sec
},
"result": {"ok": yt.get("ok", False), "error": yt.get("error")},
}]
if not yt.get("ok"):
out = f"전사 실패: {yt.get('error')}\n시간을 다시 주시거나, 다른 영상 링크를 주세요."
self._remember(text, out)
return AgentResult(output_text=out, tool_runs=tool_runs)
transcript = yt["result"]["text"]
# ▼▼▼▼▼ [추가] 전사문 원본 저장 호출 ▼▼▼▼▼
saved_txt_path = self._save_raw_transcript(transcript, title_hint="youtube_raw")
saved_msg_extra = f"\n(참고: 전사문 원본도 저장되었습니다: {os.path.basename(saved_txt_path)})" if saved_txt_path else ""
# ▲▲▲▲▲ [추가 끝] ▲▲▲▲▲
opts = self._resolve_generation_options(text)
out = self._compose_px_doc(transcript, opts)
out = self._filter_to_allowed_sections(out, opts.sections)
# === 저장 + 이메일 ===
files = self._export_files(out, title_hint="sermon_output")
saved_msg = " \n".join([f"- {k.upper()}: {v}" for k, v in files.items()])
footer = (
"\n\n---\n"
"바탕화면에 파일을 저장했습니다:\n" + saved_msg +
# ▼▼▼ 아래 줄 추가 (위에서 만든 saved_msg_extra 변수 활용) ▼▼▼
saved_msg_extra +
"\n\n이 파일들은 설정된 메일로도 전송합니다." # (참고: 원본 txt는 이메일로 보내지 않습니다)
)
# 이메일 전송
recipient = getattr(settings, "recipient_email", None)
smtp_host = getattr(settings, "smtp_host", None)
smtp_port = int(getattr(settings, "smtp_port", 587))
smtp_user = getattr(settings, "smtp_user", None)
smtp_pass = getattr(settings, "smtp_pass", None)
from_email = getattr(settings, "from_email", None)
use_tls_flag = (smtp_port != 465)
status_line = ""
if recipient and smtp_host and smtp_user and smtp_pass:
ok, err = self._send_email_with_attachments(
subject="[PX Agent] 설교 자료",
body="자동 생성된 설교 자료를 첨부합니다.",
to_email=recipient,
attachments=list(files.values()),
smtp_host=smtp_host,
smtp_port=smtp_port,
smtp_user=smtp_user,
smtp_pass=smtp_pass,
from_email=from_email,
use_tls=use_tls_flag,
)
status_line = "\n이메일로도 발송했습니다." if ok else f"\n이메일 발송 실패: {err}"
else:
status_line = "\n(참고) settings에 SMTP/recipient_email이 설정되어 있지 않아 이메일은 생략되었습니다."
out = out + footer + status_line
self.pending_url = None
self._pending_opts = None
self._remember(text, out)
return AgentResult(output_text=out, tool_runs=tool_runs)
# 이번 입력에 URL이 포함된 경우
m = self._yt_url_re.search(text)
if not m:
return None
url = m.group(1)
s, e = _parse_times_freeform(text)
if s or e:
s, e, err = _validate_time_order(s, e)
if err:
out = f"시간 입력 오류: {err}"
self._remember(text, out)
return AgentResult(output_text=out, tool_runs=[])
yt = run_tool("youtube_stt", {
"url": url, "lang_hint": "ko",
"sermon_start": s, "sermon_end": e,
"chunk_duration_sec": self._chunk_sec,
"overlap_sec": self._overlap_sec,
})
tool_runs = [{
"name": "youtube_stt",
"args": {
"url": url, "sermon_start": s, "sermon_end": e,
"chunk_duration_sec": self._chunk_sec, "overlap_sec": self._overlap_sec
},
"result": {"ok": yt.get("ok", False), "error": yt.get("error")},
}]
if not yt.get("ok"):
out = f"전사 실패: {yt.get('error')}\n시간을 다시 주시거나, 다른 영상 링크를 주세요."
self._remember(text, out)
return AgentResult(output_text=out, tool_runs=tool_runs)
transcript = yt["result"]["text"]
# ▼▼▼▼▼ [추가] 전사문 원본 저장 호출 ▼▼▼▼▼
saved_txt_path = self._save_raw_transcript(transcript, title_hint="youtube_raw")
saved_msg_extra = f"\n(참고: 전사문 원본도 저장되었습니다: {os.path.basename(saved_txt_path)})" if saved_txt_path else ""
# ▲▲▲▲▲ [추가 끝] ▲▲▲▲▲
opts = self._resolve_generation_options(text)
out = self._compose_px_doc(transcript, opts)
out = self._filter_to_allowed_sections(out, opts.sections)
# === 저장 + 이메일 ===
files = self._export_files(out, title_hint="sermon_output")
saved_msg = " \n".join([f"- {k.upper()}: {v}" for k, v in files.items()])
footer = (
"\n\n---\n"
"바탕화면에 파일을 저장했습니다:\n" + saved_msg +
# ▼▼▼ 아래 줄 추가 (위에서 만든 saved_msg_extra 변수 활용) ▼▼▼
saved_msg_extra +
"\n\n이 파일들은 설정된 메일로도 전송합니다." # (참고: 원본 txt는 이메일로 보내지 않습니다)
)
recipient = getattr(settings, "recipient_email", None)
smtp_host = getattr(settings, "smtp_host", None)
smtp_port = int(getattr(settings, "smtp_port", 587))
smtp_user = getattr(settings, "smtp_user", None)
smtp_pass = getattr(settings, "smtp_pass", None)
from_email = getattr(settings, "from_email", None)
use_tls_flag = (smtp_port != 465)
status_line = ""
if recipient and smtp_host and smtp_user and smtp_pass:
ok, err = self._send_email_with_attachments(
subject="[PX Agent] 설교 자료",
body="자동 생성된 설교 자료를 첨부합니다.",
to_email=recipient,
attachments=list(files.values()),
smtp_host=smtp_host,
smtp_port=smtp_port,
smtp_user=smtp_user,
smtp_pass=smtp_pass,
from_email=from_email,
use_tls=use_tls_flag,
)
status_line = "\n이메일로도 발송했습니다." if ok else f"\n이메일 발송 실패: {err}"
else:
status_line = "\n(참고) settings에 SMTP/recipient_email이 설정되어 있지 않아 이메일은 생략되었습니다."
out = out + footer + status_line
self._remember(text, out)
return AgentResult(output_text=out, tool_runs=tool_runs)
# URL만 온 경우 → 시간 요청
self._set_pending_url(url)
ask = (
"설교 **시작 시간**과 **끝 시간**을 알려주세요.\n"
"- 형식: HH:MM:SS (예: 00:27:15 01:12:40, 또는 00:27:15-01:12:40)\n"
"- 시간 1개만 주시면 시작으로 간주하고 **영상 끝**까지 처리합니다.\n"
"- 끝만 주시려면 '-01:12:40' 또는 '끝 01:12:40'처럼 보내주세요."
)
self._remember(text, ask)
return AgentResult(output_text=ask, tool_runs=[])
# ----------------- 퍼블릭 엔트리 -----------------
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter(1, 3))
def run(self, user_text: str) -> AgentResult:
r = self._maybe_youtube_workflow(user_text)
if r:
return r
msg = "예배용 YouTube 링크를 알려주세요. (예: https://youtu.be/XXXXXXXXXXX)"
self._remember(user_text, msg)
return AgentResult(output_text=msg, tool_runs=[])
# ----------------- 요약/생성 -----------------
def _summarize_transcript(self, transcript: str) -> str:
if not transcript or len(transcript) < 1200:
return transcript
sys = (
"다음 설교 전사문을 7,000~10,000자 정도로 아주 자세한 한국어 개요로 응집하세요. "
"핵심 논지/흐름/중요 인용만 유지하고 반복은 줄이세요."
)
resp = self.client.responses.create(
model=self._model,
input=[
{"role": "system", "content": [{"type": "input_text", "text": sys}]},
{"role": "user", "content": [{"type": "input_text", "text": transcript}]},
],
max_output_tokens=min(self._max_out, 3500),
)
return self._collect_text(resp)
# ▲▲▲ (참고: 한글 1만 자는 대략 3000~3500토큰 정도 됩니다)
def _compose_px_doc(self, transcript: str, opts: Optional[GenOptions] = None) -> str:
opts = opts or GenOptions()
brief = self._summarize_transcript(transcript)
parts = []
if "시작 기도문" in opts.sections:
a, b = opts.start_prayer_lines; parts.append(f"1) 시작 기도문 ({a}–{b}줄)")
if "아이스브레이크" in opts.sections:
parts.append(f"2) 아이스브레이크 ({opts.icebreakers_count}문항, 누구나 쉽게)")
if "본문 요약" in opts.sections:
lo, hi = opts.summary_chars; parts.append(f"3) 본문 요약 (~{lo}–{hi}자, 수필체, 중복 최소화, 흐름 유지, 소제목 3–5 허용)")
if "본문 이해 질문" in opts.sections:
parts.append(f"4) 본문 이해 질문 (정확히 {opts.understanding_q}문항)")
if "적용 질문" in opts.sections:
parts.append(f"5) 적용 질문 (정확히 {opts.application_q}문항)")
if "끝나는 기도문" in opts.sections:
a, b = opts.end_prayer_lines; parts.append(f"6) 끝나는 기도문 ({a}–{b}줄)")
order_str = "\n".join(parts)
length_hint_msg = {"short":"- 전반적으로 간결하게.\n","normal":"","long":"- 설명과 예시를 덧붙여 풍성하게.\n"}[opts.length_hint]
sys = (
"너는 'px agent'이며 한국어로 고품질 문서를 생성한다. 오직 예배 설교 전사(Transcript)만을 근거로 작성하라.\n"
"다음 ‘선택된 섹션’만 출력한다. 각 섹션은 Markdown 헤더(#)로 구분하라.\n"
f"{order_str}\n"
"- 인용은 1–2문장 이내로만.\n"
"- 과도한 상투어를 피하고 실제 예배 톤을 유지하라.\n"
"- '본문 요약' 섹션은 중간 소제목이나 개조식 나열 없이, 소그룹 인도자가 사람들 앞에서 자연스럽게 낭독할 수 있는 부드러운 구어체 설교문(줄글) 형식으로 연결하여 작성하라.\n"
"- 각 섹션 헤더는 정확히 위 제목을 포함(예: '# 적용 질문').\n"
+ length_hint_msg
)
seed = transcript[:4000] if transcript else ""
user = f"[요약본]\n{brief}\n[원문 일부]\n{seed}"
resp = self.client.responses.create(
model=self._model,
input=[
{"role": "system", "content": [{"type": "input_text", "text": sys}]},
{"role": "user", "content": [{"type": "input_text", "text": user}]},
],
max_output_tokens=self._max_out,
)
text = self._collect_text(resp)
text = self._postfix_fill_custom(text, opts)
return text
# ----------------- 사후 보강 -----------------
def _postfix_fill_custom(self, text: str, opts: GenOptions) -> str:
t = text
def _ensure_n(section: str, n: int, prefix: str, base: str) -> str:
pattern = rf"(#\s*(?:\d+\)\s*)?{section}.*?)(?=\n#\s|\Z)"
m = re.search(pattern, base, flags=re.S | re.I)
if not m: return base
block = m.group(1)
items = re.findall(r"(?m)^\s*[-*]\s+.+|^\s*\d+[\.)]\s+.+", block)
if len(items) >= n: return base
need = n - len(items)
adds = [f"- {prefix} #{len(items)+i+1}" for i in range(need)]
if not block.endswith("\n\n"):
block = block.rstrip() + "\n\n"
new_block = block + "\n".join(adds) + "\n"
return base[:m.start(1)] + new_block + base[m.end(1):]
#if "본문 이해 질문" in opts.sections:
# t = _ensure_n("본문 이해 질문", opts.understanding_q, "설교 핵심을 확인하기 위한 보충 질문을 작성해 보세요", t)
#if "적용 질문" in opts.sections:
# t = _ensure_n("적용 질문", opts.application_q, "이번 주에 실제로 적용해 볼 행동을 구체적으로 정해 보세요", t)
if "끝나는 기도문" in opts.sections:
t = self._ensure_ending_prayer_length_custom(t, lines_range=opts.end_prayer_lines)
if "시작 기도문" in opts.sections:
t = self._ensure_starting_prayer_length_custom(t, lines_range=opts.start_prayer_lines)
return t
def _ensure_starting_prayer_length_custom(self, text: str, lines_range: Tuple[int, int]) -> str:
m = re.search(r"(#\s*(?:\d+\)\s*)?시작 기도문[^\n]*\n)(.*?)(?=\n#\s|\Z)", text, flags=re.S | re.I)
if not m: return text
header, body = m.group(1), m.group(2)
lines = [ln for ln in body.splitlines() if ln.strip()]
lo, hi = lines_range
if len(lines) >= lo: return text
sys = (f"아래 '시작 기도문' 본문을 {lo}–{hi}줄로 자연스럽게 보강하세요. 톤은 온화하고 구체적이며 과한 상투어를 피하세요. 헤더는 생성하지 마세요.")
user = f"[본문]\n{body}\n[/본문]"
try:
resp = self.client.responses.create(
model=self._model,
input=[
{"role": "system", "content": [{"type": "input_text", "text": sys}]},
{"role": "user", "content": [{"type": "input_text", "text": user}]},
],
max_output_tokens=min(self._max_out, 600),
)
new_body = self._collect_text(resp).strip()
if not new_body.strip() or "(No text output)" in new_body: return text
return text[:m.start()] + header + new_body + text[m.end():]
except Exception:
return text
def _ensure_ending_prayer_length_custom(self, text: str, lines_range: Tuple[int, int]) -> str:
m = re.search(r"(#\s*(?:\d+\)\s*)?끝나는 기도문[^\n]*\n)(.*?)(?=\n#\s|\Z)", text, flags=re.S | re.I)
if not m: return text
header, body = m.group(1), m.group(2)
lines = [ln for ln in body.splitlines() if ln.strip()]
lo, hi = lines_range
if len(lines) >= lo: return text
sys = (f"아래 '끝나는 기도문' 본문을 {lo}–{hi}줄의 예배 마무리 기도문으로 자연스럽게 보강하세요. 톤은 온화하고 구체적이며 과한 상투어를 피하세요. 헤더는 생성하지 마세요.")
user = f"[본문]\n{body}\n[/본문]"
try:
resp = self.client.responses.create(
model=self._model,
input=[
{"role": "system", "content": [{"type": "input_text", "text": sys}]},
{"role": "user", "content": [{"type": "input_text", "text": user}]},
],
max_output_tokens=min(self._max_out, 600),
)
new_body = self._collect_text(resp).strip()
if not new_body.strip() or "(No text output)" in new_body: return text
return text[:m.start()] + header + new_body + text[m.end():]
except Exception:
return text
# ----------------- 공통 유틸 -----------------
def _collect_text(self, resp) -> str:
out = getattr(resp, "output_text", "") or ""
if out: return out.strip()
collected: List[str] = []
for item in getattr(resp, "output", []) or []:
if getattr(item, "type", None) == "message":
for p in getattr(item, "content", []) or []:
if getattr(p, "type", None) in {"text", "output_text"}:
collected.append(getattr(p, "text", "") or "")
return ("\n".join(collected).strip()) or "(No text output from model)"
def _remember(self, user_text: str, assistant_text: str) -> None:
ut = redact_pii(user_text)
at = redact_pii(assistant_text)
self.memory.add("user", ut)
self.memory.add("assistant", at)
self._summarize_memory()
def _summarize_memory(self) -> None:
window = self.memory.window(12)
text = "\n".join([f"{m.role}: {m.content}" for m in window])
resp = self.client.responses.create(
model=self._model,
input=[
{
"role": "system",
"content": [{"type": "input_text",
"text": "Summarize the conversation succinctly for future context. Korean user; keep details that matter for tasks."}],
},
{"role": "user", "content": [{"type": "input_text", "text": text}]},
],
max_output_tokens=min(self._mem_sum_tokens, 1000),
)
summary = self._collect_text(resp)
if summary and summary != "(No text output from model)":
self.memory.summary = summary
agent/config.py
# agent/config.py
from pydantic import BaseModel
import os
from dotenv import load_dotenv
load_dotenv() # .env 로드
def _parse_csv_env(name: str, default_list: list[str]) -> list[str]:
raw = os.getenv(name, "")
if not raw:
return default_list
# 쉼표 분리 + 공백 제거 + 빈 항목 제거
return [x.strip() for x in raw.split(",") if x.strip()]
class Settings(BaseModel):
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
model: str = os.getenv("OPENAI_MODEL", "gpt-5")
reasoning_effort: str = os.getenv("REASONING_EFFORT", "medium")
# 출력 토큰/메모리 요약
max_output_tokens: int = int(os.getenv("MAX_OUTPUT_TOKENS", "3200"))
memory_summary_tokens: int = int(os.getenv("MEMORY_SUMMARY_TOKENS", "800"))
# 사용 도구 허용 목록 (기본: STT/Transcript 둘 다)
tool_allowlist: list[str] = _parse_csv_env(
"TOOL_ALLOWLIST",
["youtube_stt", "youtube_transcript"]
)
# YouTube STT 옵션
youtube_chunk_duration_sec: int = int(os.getenv("YT_CHUNK_SEC", "600"))
youtube_overlap_sec: int = int(os.getenv("YT_OVERLAP_SEC", "1"))
# 이메일(선택)
recipient_email: str = os.getenv("RECIPIENT_EMAIL", "") # 수신자 이메일
smtp_host: str = os.getenv("SMTP_HOST", "") # e.g., smtp.gmail.com
smtp_port: int = int(os.getenv("SMTP_PORT", "587")) # TLS 587 / SSL 465
smtp_user: str = os.getenv("SMTP_USER", "") # 로그인(보통 이메일)
smtp_pass: str = os.getenv("SMTP_PASS", "") # 앱 비밀번호/SMTP 비번
from_email: str = os.getenv("FROM_EMAIL", "") # 발신 표시(미설정 시 smtp_user 사용)
settings = Settings()
agent/memory.py
from __future__ import annotations
from typing import List, Dict
from dataclasses import dataclass, field
@dataclass
class Message:
role: str # 'user' | 'assistant' | 'tool'
content: str
@dataclass
class MemoryStore:
# Very simple in-memory convo store + rolling summary
messages: List[Message] = field(default_factory=list)
summary: str = ""
def add(self, role: str, content: str) -> None:
self.messages.append(Message(role=role, content=content))
def window(self, limit: int = 10) -> List[Message]:
return self.messages[-limit:]
def to_responses_input(self, summary_first: bool = True) -> List[Dict]:
items: List[Dict] = []
if summary_first and self.summary:
# f-string 제거: 줄바꿈 포함 안전하게 연결
items.append({"role": "system", "content": "[Conversation summary]\n" + self.summary})
for m in self.window(20):
items.append({"role": m.role, "content": m.content})
return items
agent/safety.py
import re
from typing import List
from .config import settings
PII_PATTERNS = [
re.compile(r"""\b\d{3}-\d{2}-\d{4}\b"""), # SSN-like
re.compile(r"""\b\d{16}\b"""), # naive card
]
def redact_pii(text: str) -> str:
redacted = text
for pat in PII_PATTERNS:
redacted = pat.sub("[REDACTED]", redacted)
return redacted
def is_tool_allowed(tool_name: str) -> bool:
return tool_name in settings.tool_allowlist
agent/tool_router.py
# agent/tool_router.py
from __future__ import annotations
from typing import List, Tuple, Dict, Any, Callable, Optional
import importlib
import traceback
from .config import settings # allowlist를 적용하기 위해 필요
# 각 툴은 (spec_dict, run_callable) 형태로 등록
# run_callable 서명: (args: Dict[str, Any]) -> Dict[str, Any]
_REGISTRY: List[Tuple[Dict[str, Any], Callable[[Dict[str, Any]], Dict[str, Any]]]] = []
def _safe(spec: Dict[str, Any], fn: Callable[[Dict[str, Any]], Dict[str, Any]]) -> None:
"""
spec 검증 및 등록.
- 필수 키: name
- allowlist에 포함된 툴만 등록 (allowlist가 비어 있으면 모두 허용)
"""
name = spec.get("name")
if not name or not isinstance(name, str):
raise AssertionError("Tool spec must include a string 'name' field.")
allow = settings.tool_allowlist or []
if allow and name not in allow:
# 허용 목록이 설정되어 있고 이 이름이 없으면 스킵
return
_REGISTRY.append((spec, fn))
def _load_tool(module_path: str, expected_name: Optional[str] = None) -> None:
"""
모듈을 import하고 tool_spec()/run를 가져와 등록.
expected_name이 주어지면 spec["name"]과 일치 검증.
"""
mod = importlib.import_module(module_path)
if not hasattr(mod, "tool_spec") or not callable(getattr(mod, "tool_spec")):
raise AssertionError(f"{module_path} must define a callable tool_spec().")
if not hasattr(mod, "run") or not callable(getattr(mod, "run")):
raise AssertionError(f"{module_path} must define a callable run(args: dict).")
spec = mod.tool_spec()
if expected_name and spec.get("name") != expected_name:
raise AssertionError(f"Tool name mismatch: {expected_name} vs {spec.get('name')}")
_safe(spec, mod.run)
def register() -> None:
"""
사용 도구만 로딩. allowlist가 비어있다면 모두 등록 시도, 있으면 그 이름만 등록 시도.
"""
_REGISTRY.clear()
# 우리가 지원하는 툴 목록(모듈 경로, 기대 이름)
catalog: List[Tuple[str, str]] = [
("agent.tools.youtube_transcript", "youtube_transcript"),
("agent.tools.youtube_stt", "youtube_stt"),
]
allow = settings.tool_allowlist or []
for module_path, expected in catalog:
if allow and expected not in allow:
# 허용 목록이 있다면, 해당 이름만 시도
continue
try:
_load_tool(module_path, expected_name=expected)
except Exception:
# 개별 툴 실패는 전체를 막지 않도록 삼킵니다.
# 필요하면 로깅 시스템에 traceback 기록
# print(traceback.format_exc()) # 개발 중엔 활성화
pass
def tool_specs() -> List[Dict[str, Any]]:
if not _REGISTRY:
register()
return [spec for (spec, _) in _REGISTRY]
def _find_tool(name: str) -> Optional[Tuple[Dict[str, Any], Callable[[Dict[str, Any]], Dict[str, Any]]]]:
if not _REGISTRY:
register()
for (spec, fn) in _REGISTRY:
if spec.get("name") == name:
return spec, fn
return None
def run_tool(name: str, args: Dict[str, Any]) -> Dict[str, Any]:
"""
일관 포맷으로 결과 반환:
- 성공: {"ok": True, "result": <tool_result>, "error": None}
- 실패: {"ok": False, "result": None, "error": "<메시지>"}
각 툴의 run도 위 형식을 권장하지만, 여기서도 방어적으로 감쌉니다.
"""
try:
found = _find_tool(name)
if not found:
return {"ok": False, "result": None, "error": f"Tool not found or not allowed: {name}"}
spec, fn = found
# allowlist가 런타임 중 변경될 수도 있으니 마지막 방어선
allow = settings.tool_allowlist or []
if allow and spec.get("name") not in allow:
return {"ok": False, "result": None, "error": f"Tool not allowed by settings: {name}"}
# 툴 실행
raw = fn(args)
# 툴의 반환이 이미 표준 포맷이면 그대로
if isinstance(raw, dict) and "ok" in raw and ("result" in raw or "error" in raw):
return raw
# 아니면 감싸서 표준화
return {"ok": True, "result": raw, "error": None}
except Exception as e:
# 예외를 안전하게 메시지로 변환
msg = f"{type(e).__name__}: {e}"
# print(traceback.format_exc()) # 필요시 로깅
return {"ok": False, "result": None, "error": msg}
agent/tools/youtube_transcript.py
# agent/tools/youtube_transcript.py
from __future__ import annotations
from typing import Dict, Any, List, Optional
import re
from urllib.parse import urlparse, parse_qs
# NEW: 안전한 임포트(패키지 미설치시 깔끔히 에러 반환)
try:
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
_YT_AVAILABLE = True
except Exception:
YouTubeTranscriptApi = None # type: ignore
TranscriptsDisabled = NoTranscriptFound = Exception # type: ignore
_YT_AVAILABLE = False
_YT_ID_RE = re.compile(r"^[A-Za-z0-9_-]{11}$")
def _extract_video_id(url: str) -> Optional[str]:
# ... (사용하신 robust extractor 그대로) ...
u = urlparse(url)
host = (u.netloc or "").lower()
path = u.path or ""
qs = parse_qs(u.query or "")
vvals = qs.get("v")
if vvals:
vid = vvals[0]
if _YT_ID_RE.match(vid):
return vid
if host.endswith("youtu.be"):
seg = path.strip("/").split("/")
if seg and _YT_ID_RE.match(seg[0]):
return seg[0]
segs = [p for p in path.split("/") if p]
if len(segs) >= 2 and segs[0] in {"live", "embed", "shorts"}:
cand = segs[1]
if _YT_ID_RE.match(cand):
return cand
if segs and _YT_ID_RE.match(segs[-1]):
return segs[-1]
m = re.search(r"(?:v=|/live/|/embed/|/shorts/|youtu\.be/)([A-Za-z0-9_-]{11})", url)
if m:
return m.group(1)
return None
def _flatten_transcript(items: List[Dict[str, Any]]) -> str:
parts, buf = [], []
for it in items:
t = (it.get("text") or "").strip()
if not t:
continue
if t.startswith("[") and t.endswith("]"):
continue
buf.append(t)
if len(" ".join(buf)) > 800:
parts.append(" ".join(buf)); buf = []
if buf:
parts.append(" ".join(buf))
return "\n".join(parts).strip()
def tool_spec() -> Dict[str, Any]:
return {
"name": "youtube_transcript",
"description": "Fetch transcript text from a YouTube URL (ko/en preferred). Returns plain text.",
"input_schema": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube video URL"},
"prefer_langs": {
"type": "array", "items": {"type": "string"},
"description": "Preferred languages, in order. e.g., ['ko', 'en']",
"default": ["ko", "en"],
},
"allow_translate": {
"type": "boolean",
"description": "If no direct transcript in preferred langs, allow YouTube translation.",
"default": True,
},
},
"required": ["url"],
"additionalProperties": False,
},
}
def run(args: Dict[str, Any]) -> Dict[str, Any]:
if not _YT_AVAILABLE:
return {"ok": False, "error": "youtube-transcript-api not installed."}
url = args.get("url", "")
prefer_langs = args.get("prefer_langs") or ["ko", "en"]
allow_translate = bool(args.get("allow_translate", True))
vid = _extract_video_id(url)
if not vid:
return {"ok": False, "error": "Invalid YouTube URL (cannot extract video id)."}
try:
# 최신버전 경로: list_transcripts 우선
try:
tl = YouTubeTranscriptApi.list_transcripts(vid)
# 1) 선호 언어 직접 자막
for lang in prefer_langs:
try:
tr = tl.find_transcript([lang])
text = _flatten_transcript(tr.fetch())
if text:
return {"ok": True, "result": {"video_id": vid, "lang": lang, "text": text, "source": "direct"}}
except Exception:
pass
# 2) 번역 허용 시
if allow_translate:
for lang in prefer_langs:
for candidate in tl:
try:
text = _flatten_transcript(candidate.translate(lang).fetch())
if text:
return {"ok": True, "result": {"video_id": vid, "lang": lang, "text": text, "source": "translated"}}
except Exception:
continue
except AttributeError:
# 구버전 폴백: get_transcript
for lang in prefer_langs:
try:
items = YouTubeTranscriptApi.get_transcript(vid, languages=[lang])
text = _flatten_transcript(items)
if text:
return {"ok": True, "result": {"video_id": vid, "lang": lang, "text": text, "source": "direct_legacy"}}
except Exception:
pass
return {"ok": False, "error": "No transcript available in preferred langs (and translation failed)."}
except TranscriptsDisabled:
return {"ok": False, "error": "Transcripts disabled for this video."}
except NoTranscriptFound:
return {"ok": False, "error": "No transcript found for this video."}
except Exception as e:
return {"ok": False, "error": f"Unexpected error: {e}"}
agent/tools/youtube_stt.py
# agent/tools/youtube_stt.py
from __future__ import annotations
from typing import Dict, Any, List, Optional, Tuple
import os
import re
import shutil
import tempfile
import subprocess
import openai
# -----------------------------
# Utilities
# -----------------------------
def _which(cmd: str) -> Optional[str]:
return shutil.which(cmd)
def _get_ffmpeg_paths() -> Tuple[str, str]:
ffmpeg = os.getenv("FFMPEG_PATH") or _which("ffmpeg")
ffprobe = os.getenv("FFPROBE_PATH") or _which("ffprobe")
if not ffmpeg or not ffprobe:
raise RuntimeError("ffmpeg/ffprobe not found. Set FFMPEG_PATH/FFPROBE_PATH or add to PATH.")
return ffmpeg, ffprobe
def _run(cmd: List[str]) -> subprocess.CompletedProcess:
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
def _probe_duration(ffprobe: str, media_path: str) -> float:
cmd = [
ffprobe, "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
media_path,
]
out = _run(cmd).stdout.decode("utf-8", "ignore").strip()
try:
return float(out)
except Exception:
return 0.0
def _hhmmss_to_seconds(s: Optional[str]) -> Optional[int]:
if not s:
return None
try:
h, m, sec = s.strip().split(":")
return int(h) * 3600 + int(m) * 60 + int(sec)
except Exception:
return None
def _seconds_to_hhmmss(x: int) -> str:
h = x // 3600
m = (x % 3600) // 60
s = x % 60
return f"{h:02d}:{m:02d}:{s:02d}"
def _yt_download_audio(url: str, outdir: str) -> str:
"""
Download best-available audio using yt-dlp with SABR 대응 옵션.
Returns local file path.
"""
ytdlp = _which("yt-dlp") or _which("youtube-dl")
if not ytdlp:
raise RuntimeError("yt-dlp (or youtube-dl) is not installed or not in PATH.")
outtpl = os.path.join(outdir, "%(id)s.%(ext)s")
cmd = [
ytdlp,
"-N", "8",
"--no-check-formats",
"--hls-prefer-ffmpeg",
"-f", "bestaudio/best",
"--no-playlist",
"-o", outtpl,
url,
]
_run(cmd)
cand = []
for fn in os.listdir(outdir):
p = os.path.join(outdir, fn)
if os.path.isfile(p) and any(fn.lower().endswith(ext) for ext in (".webm", ".m4a", ".mp4", ".mp3")):
cand.append((os.path.getsize(p), p))
if not cand:
raise RuntimeError("Audio download failed: no audio file found.")
cand.sort(reverse=True)
return cand[0][1]
def _ensure_wav(ffmpeg: str, in_path: str, out_path: str) -> None:
"""
Normalize to 16kHz mono PCM WAV to keep STT chunks small.
"""
cmd = [
ffmpeg, "-y", "-i", in_path,
"-ac", "1", "-ar", "16000",
"-vn", "-c:a", "pcm_s16le",
out_path,
]
_run(cmd)
def _slice_wav(ffmpeg: str, wav_path: str, start: int, end: int, out_path: str) -> None:
dur = max(1, end - start)
cmd = [
ffmpeg, "-y",
"-ss", str(start),
"-t", str(dur),
"-i", wav_path,
"-ac", "1", "-ar", "16000",
"-vn", "-c:a", "pcm_s16le",
out_path,
]
_run(cmd)
def _chunk_offsets(total: int, chunk_sec: int, overlap_sec: int = 0) -> List[Tuple[int, int]]:
if chunk_sec <= 1:
return [(0, total)]
offs = []
step = max(1, chunk_sec - max(0, overlap_sec))
cur = 0
while cur < total:
end = min(total, cur + chunk_sec)
offs.append((cur, end))
if end >= total:
break
cur += step
return offs
# -----------------------------
# OpenAI STT
# -----------------------------
def _openai_client() -> openai.OpenAI:
api_key = os.getenv("OPENAI_API_KEY", "")
if not api_key:
raise RuntimeError("OPENAI_API_KEY is not set.")
return openai.OpenAI(api_key=api_key)
def _transcribe_file(client: openai.OpenAI, file_path: str, model: str, language: Optional[str]) -> str:
with open(file_path, "rb") as f:
resp = client.audio.transcriptions.create(
model=model,
file=f,
language=language or None,
)
return getattr(resp, "text", "") or ""
# -----------------------------
# Public Tool API
# -----------------------------
def tool_spec() -> Dict[str, Any]:
return {
"name": "youtube_stt",
"description": "Download YouTube audio and transcribe only the requested sermon segment.",
"input_schema": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube video URL"},
"lang_hint": {"type": "string", "description": "Language hint (e.g., 'ko').", "default": "ko"},
"sermon_start": {"type": "string", "description": "Start time HH:MM:SS (optional, default 00:00:00)"},
"sermon_end": {"type": "string", "description": "End time HH:MM:SS (optional, default video end)"},
"chunk_duration_sec": {"type": "integer", "description": "Chunk seconds for upload.", "default": 600},
"overlap_sec": {"type": "integer", "description": "Overlap seconds between chunks.", "default": 1},
},
"required": ["url"],
"additionalProperties": False,
},
}
def run(args: Dict[str, Any]) -> Dict[str, Any]:
"""
자동탐지 없음.
- sermon_start: 없으면 0초
- sermon_end : 없으면 영상(정규화 WAV) 끝까지
지정 구간만 추출해 청크 분할 후 STT하여 결합.
"""
url = args.get("url", "")
if not url:
return {"ok": False, "error": "Missing 'url'."}
lang_hint = (args.get("lang_hint") or "ko").strip()
chunk_sec = int(args.get("chunk_duration_sec") or 600)
overlap_sec = int(args.get("overlap_sec") or 1)
transcribe_model = os.getenv("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe")
try:
ffmpeg, ffprobe = _get_ffmpeg_paths()
except Exception as e:
return {"ok": False, "error": str(e)}
try:
client = _openai_client()
except Exception as e:
return {"ok": False, "error": f"OpenAI client error: {e}"}
work = tempfile.mkdtemp(prefix="ytstt_")
try:
# 1) 오디오 다운로드
audio_path = _yt_download_audio(url, work)
# 2) WAV 정규화
norm_wav = os.path.join(work, "audio_16k.wav")
_ensure_wav(ffmpeg, audio_path, norm_wav)
total_sec = int(round(_probe_duration(ffprobe, norm_wav)))
# 3) 시간 해석 (시작 없으면 0, 끝 없으면 total)
start_s = _hhmmss_to_seconds(args.get("sermon_start"))
end_s = _hhmmss_to_seconds(args.get("sermon_end"))
used_start = max(0, start_s or 0)
used_end = min(total_sec, end_s if end_s is not None else total_sec)
if used_end <= used_start + 2:
return {"ok": False, "error": "Invalid time window. Please check start/end times."}
# 4) 지정 구간 추출
sliced_path = os.path.join(work, "sermon_window.wav")
_slice_wav(ffmpeg, norm_wav, used_start, used_end, sliced_path)
win_len = int(round(_probe_duration(ffprobe, sliced_path)))
if win_len <= 1:
return {"ok": False, "error": "Chosen window is too short to transcribe."}
# 5) 청크 분할 + STT
offs = _chunk_offsets(win_len, chunk_sec, overlap_sec=max(0, overlap_sec))
out_texts: List[str] = []
segments_meta: List[Dict[str, Any]] = []
for i, (a, b) in enumerate(offs, start=1):
chunk_path = os.path.join(work, f"chunk_{i:03d}.wav")
_slice_wav(ffmpeg, sliced_path, a, b, chunk_path)
try:
text = _transcribe_file(client, chunk_path, transcribe_model, lang_hint or None)
except Exception as e:
return {"ok": False, "error": f"stt failed on chunk {i}: {e}"}
abs_start = used_start + a
abs_end = used_start + b
tag = f"[{_seconds_to_hhmmss(abs_start)}–{_seconds_to_hhmmss(abs_end)}]"
out_texts.append(f"{tag}\n{text.strip()}\n")
segments_meta.append({
"index": i,
"local_start": a,
"local_end": b,
"abs_start": _seconds_to_hhmmss(abs_start),
"abs_end": _seconds_to_hhmmss(abs_end),
"chars": len(text or ""),
})
final_text = ("\n".join(out_texts)).strip()
if not final_text:
return {"ok": False, "error": "Empty transcription result."}
return {
"ok": True,
"result": {
"text": final_text,
"used_range": {
"start_hhmmss": _seconds_to_hhmmss(used_start),
"end_hhmmss": _seconds_to_hhmmss(used_end),
},
"segments": segments_meta,
},
}
except subprocess.CalledProcessError as e:
err = (e.stderr or b"").decode("utf-8", "ignore")
return {"ok": False, "error": f"Command failed: {err.strip() or e}"}
except Exception as e:
return {"ok": False, "error": f"stt failed: {e}"}
finally:
try:
shutil.rmtree(work, ignore_errors=True)
except Exception:
pass